package biz

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/gogf/gf/util/gconv"
	"github.com/tx7do/kratos-transport/broker"
	"github.com/tx7do/kratos-transport/broker/nsq"
	v1 "kratos_kafka/api/helloworld/v1"
	"kratos_kafka/internal/utils"
	"kratos_kafka/internal/utils/mq_nsq"
	"time"
)

func (uc *GreeterUsecase) SendMsgToNSQ(ctx context.Context, req *v1.Empty) (*v1.Empty, error) {
	reply := &v1.Empty{}

	currEvent := mq_nsq.BookData{
		Title:       "book_test",
		Desc:        "desc...",
		Authors:     []string{"whw", "naruto", "sasuke"},
		PublishTime: time.Now(),
	}
	body, errBody := json.Marshal(currEvent)
	if errBody != nil {
		return nil, errBody
	}

	// Notice 异步发送消息，需要初始化一个 Channel ！！！！！
	// 这版本封装的NSQ 没有 "发送批次大小" 这个 PublishOption！
	// 但是有一个 "延迟发送"的配置

	// go get github.com/nsqio/go-nsq@v1.1.0
	//donChan := make(chan *goNsq.ProducerTransaction, 1)

	errPublish := uc.broker.Nsq.Publish(
		uc.yamlConf.MessageQueue.Nsq.BookTopic,
		body,
		// TODO 异步发送消息: 实际上不太靠谱(尽量别用) tests/kratos_nsq中测试显示 donChan长度大于1或者测试代码中for循环有time.Sleep时会有消息丢失！
		// TODO 有时间研究下 asyncPublishKey这个key 对应的使用方法 PublishAsync/DeferredPublishAsync
		//nsq.WithAsyncPublish(donChan),

		// Notice 延迟队列
		// Notice deferredPublishKey这个key 对应的使用方法 DeferredPublishAsync/DeferredPublish —— 实际用了"DPUB"命令 应该是NSQ自带的
		nsq.WithDeferredPublish(time.Second*3),
	)
	if errPublish != nil {
		return nil, errPublish
	}

	fmt.Printf("%v 成功往NSQ中发送了消息: %v %v\n", utils.GetCurrentTime(), uc.yamlConf.MessageQueue.Nsq.BookTopic, gconv.String(currEvent))

	return reply, nil

}

// 接收NSQ中的消息
func (uc *GreeterUsecase) ReceiveMsgFromNSQ(ctx context.Context, topic string, headers broker.Headers, msg *mq_nsq.BookData) error {

	if msg == nil || msg.Title == "" || msg.Desc == "" || len(msg.Authors) == 0 {
		fmt.Println("ReceiveMsgFromNSQ NSQ中的消息不正确! msg: ", gconv.String(msg))
		return nil
	}

	fmt.Printf("%v 从NSQ中接收到了正确的数据: %v, %v \n", utils.GetCurrentTime(), topic, gconv.String(msg))

	return nil
}
